{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# QuickStart\n",
    "\n",
    "> A versão do Spark que usaremos nas aulas é __1.6.2__, que é a que está disponível neste ambiente.\n",
    "\n",
    "A abstração principal da Spark é uma coleção distribuída de itens chamada Resilient Distributed Dataset (RDD)(_que será detalhada na próxima aula_). Os RDDs podem ser criados a partir de Hadoop InputFormats (como arquivos HDFS) ou transformando outros RDDs. Vamos fazer um novo RDD a partir do dataset de notícias usado no trabalho 2:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "newsTextFile = sc.textFile(\"news.xml\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> __sc__ é uma variável que este ambiente fornece para acessarmos o [SparkContext](https://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/SparkContext.html).\n",
    "\n",
    "Os RDDs têm [__actions__](http://spark.apache.org/docs/1.6.2/programming-guide.html#actions), que retornam valores, e [__transformations__](http://spark.apache.org/docs/1.6.2/programming-guide.html#transformations), que retornam ponteiros para novos RDDs. Vamos começar com algumas ações:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "newsTextFile.count()  # Número de itens nesse RDD. Como é um RDD de text, incialmente, itens aqui são as linhas."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "newsTextFile.first() # Primeiro item neste RDD"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Mãos na massa!\n",
    "\n",
    "Objetivo dessa prática é fazermos uma contagem de palavras e listar as palavras mais frequentes. Entretanto, queremos fazer isso somente sobre o __texto__ das notícias. Como você percebeu na execução anterior e no trabalho T2, o dataset contém algumas linhas iniciais que não são notícias. O primeiro passo então é filtrar somente as linhas que são notícias."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def is_news(line):\n",
    "    # Dê um corpo a função abaixo. \n",
    "    # Ela deve receber uma linha e retornar True se for uma notícia ou False caso contrário.\n",
    "    return True\n",
    "    \n",
    "onlyNewsLines = newsTextFile.filter(is_news)\n",
    "\n",
    "onlyNewsLines.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Ainda preparando o dado, devemos extrair apenas o texto das notícias e analisar apenas notícias com texto. Então faremos as seguintes transformações:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import lesson1_utils as l1u\n",
    "\n",
    "newsText = onlyNewsLines.map(l1u.extract_text).filter(lambda text: text != None and len(text) > 0)\n",
    "\n",
    "newsText.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Neste momento, nosso RDD é composto somente pelos textos das notícias. Analisemos um pouco o RDD com algumas transformações e ações."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# A transformação filter retorna um subconjunto do RDD inicial. \n",
    "# Somente é selecionado o item que obedece, ou seja, retorna True, na função de filtragem.\n",
    "newsWithBrasil = newsText.filter(lambda text: \"Brasil\" in text)\n",
    "\n",
    "print \"Quantidade de notícias que citam o 'Brasil': %d\" % newsWithBrasil.count() "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As ações e transformações de RDD podem ser usadas para computações mais complexas. Vamos dizer que queremos encontrar a notícia com mais palavras:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "newsText.map(lambda text: len( text.split() )).reduce(lambda a, b: a if a > b else b)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A primeira transformação, `map` (transformação), mapeia um texto para um valor inteiro (quantidade de palavras), criando um novo RDD. `reduce` (ação) é chamado em que RDD para encontrar a maior quantidade de palavras. Os argumentos para o `map` e `reduce` são funções anônimas do Python (lambdas), mas também podemos passar qualquer função Python de nível superior que desejamos. Por exemplo, vamos definir uma função `max` para tornar este código mais fácil de entender:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def max(a, b):\n",
    "    if a > b:\n",
    "        return a\n",
    "    else:\n",
    "        return b\n",
    "    \n",
    "newsText.map(lambda text: len( text.split() )).reduce(max)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Um padrão de fluxo de dados comum é __MapReduce__, como popularizado pelo Hadoop. Com o Spark pode-se implementar MapReduce facilmente, como nesse exemplo de contagem de palavras:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def toWords(text): return text.split()\n",
    "def one(word): return (word, 1)\n",
    "def sum(a, b): return a + b\n",
    "\n",
    "wordsCounts = newsText.flatMap(toWords).map(one).reduceByKey(sum)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Aqui, combinamos as transformações [`flatMap`](http://spark.apache.org/docs/1.6.2/programming-guide.html#transformations), [`map`](http://spark.apache.org/docs/1.6.2/programming-guide.html#transformations) e [`reduceByKey`](http://spark.apache.org/docs/1.6.3/programming-guide.html#transformations) para calcular as contagens por palavra no arquivo como um RDD de pares `(string, int)`. \n",
    "\n",
    "Podemos usar a ação [`collect`](http://spark.apache.org/docs/1.6.3/programming-guide.html#actions) para ver o resultado."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "sorted(wordsCounts.collect(), key=lambda (_, c): c, reverse=True)[:10]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Bônus\n",
    "\n",
    "Iiii rapaz, só tem stopword entre as mais frequentes. Que tal removê-las e descobrir quais são as palavras mais frequentes nos textos?\n",
    "\n",
    "Use o arquivo __stopwords.pt__ para auxiliar você nesse exercício."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# PASSOS:\n",
    "# 1. Transforme o arquivo stopwords.pt em um a lista python, onde cada linha é um item da lista\n",
    "# 2. Implemente uma função que recebe uma lista de palavras, oriundas do texto de uma notícia, e remova as palvras \n",
    "#    que estão presentes na lista de stopwords montadas no passo 1.\n",
    "# 3. Altere a função toWords, retornando a chamada da função criada no passo 2. recebendo como parâmetro o split \n",
    "#    do texto\n",
    "# 4. Refaça as chamadas: \n",
    "#    4.1.  wordsCounts = newsText.flatMap(toWords).map(one).reduceByKey(sum)\n",
    "#    4.2.  sorted(wordsCounts.collect(), key=lambda (_, c): c, reverse=True)[:10]\n",
    "# E só!"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Pyspark (Py 2)",
   "language": "",
   "name": "pyspark"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
